#include "threadpool.h"
#include "xthread.h"
#include <assert.h>

ThreadPool::ThreadPool(int inisize, int capacity)
{
    _capacity = (capacity < 0 || capacity > MAX_CAPA_SIZE) ? MAX_CAPA_SIZE : capacity;
    _size     = (inisize < 0 || inisize > MAX_INIT_SIZE) ? MAX_INIT_SIZE : inisize;
    _size     = _size > _capacity ? _capacity : _size;

    _exit     = false;
    _pids     = 0;

    {
        std::lock_guard<std::mutex> locker(_mutex);
        for (int i = 0; i < _size; ++i)
        {
            uint32_t pid = ++_pids;
            xthread *t = new xthread(pid, &ThreadPool::dispatch, this, pid);

            t->set_pts(std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now()).time_since_epoch().count());
            _idle.push(t);
        }
    }

    /* std::this_thread::sleep_for(std::chrono::milliseconds(10)); */

    /* launch monitor thread */
    assert(0 == launch(entry, this));
}

ThreadPool::~ThreadPool()
{
    _exit = true;

    {
        std::this_thread::sleep_for(std::chrono::milliseconds(20));
        std::unique_lock<std::mutex> lock(_mutex);
        _cv.notify_all();
    }

    int jcnt = 0;
    while (_size > 0)
    {
        {
            std::unique_lock<std::mutex> lock(_mutex);
            while (!_idle.empty())
            {
                xthread *t = _idle.front();

                if (!t->joinable())
                {
                    std::this_thread::sleep_for(std::chrono::milliseconds(1));
                    continue;
                }

                t->set_join(true);
                lock.unlock();
                t->join();
                lock.lock();

                delete t;
                _idle.pop();
                jcnt++;
                _size--;
            }

            while (!_ready.empty())
            {
                xthread *t = _ready.front();

                if (!t->joinable())
                {
                    std::this_thread::sleep_for(std::chrono::milliseconds(1));
                    continue;
                }

                t->set_join(true);
                lock.unlock();
                t->join();
                lock.lock();

                delete t;
                _ready.pop();
                jcnt++;
                _size--;
            }
        }

        std::this_thread::sleep_for(std::chrono::milliseconds(1));
    }

    printf("join count: %d\n", jcnt);
}

int
ThreadPool::launch(void* (*routine)(void *, int), void *data, int id)
{
    /* std::lock_guard<std::mutex> locker(_mutex); */
    std::unique_lock<std::mutex> lock(_mutex);
    if (_idle.empty())
    {
        if (_size >= _capacity)
        {
            /* printf("out of capacity!\n"); */
            return -1;
        }

        uint32_t pid = ++_pids;
        xthread *t = new xthread(pid, &ThreadPool::dispatch, this, pid);
        t->set_pts(std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now()).time_since_epoch().count());
        _idle.push(t);
        _size++;
    }

    xthread *t = _idle.front();

    t->set_entry(routine, data, id);

    _ready.push(t);
    _idle.pop();
    _cv.notify_all();
    /* _cv.notify_one(); */

    return 0;
}

void
ThreadPool::dispatch(uint32_t pid)
{
    /* std::this_thread::sleep_for(std::chrono::milliseconds(5000)); */
    /* printf("1111111111111111111111111\n"); */

    for (;;)
    {
        xthread *t = NULL;
        {
            std::unique_lock<std::mutex> lock(_mutex);
            while (0 == _ready.size())
            {
                if (_exit)
                {
                    return;
                }
                _cv.wait(lock);
                if (_exit)
                {
                    return;
                }

                if (!_idle.empty()
                        && _idle.front()->pid() == pid
                        && !_idle.front()->is_join()
                        && _idle.front()->is_detach())
                {
                    printf("%ld, %ld\n", _idle.front()->pid(), pid);
                    _idle.front()->detach();
                    delete _idle.front();
                    _idle.pop();
                    _size--;
                    return;
                }
            }

            if (_ready.front()->pid() != pid)
            {
                std::this_thread::sleep_for(std::chrono::milliseconds(1));
                continue;
            }

            t = _ready.front();
            _ready.pop();
        }

        t->func()(t->data(), t->id());

        std::unique_lock<std::mutex> lock(_mutex);

        uint64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
        t->set_pts(now);
        t->set_entry(NULL, NULL, -1);

        _idle.push(t);

        if (_exit)
        {
            break;
        }
    }
}

void
ThreadPool::monitor()
{
    uint64_t cnt = 0;
    uint64_t len = 50;
    /* std::this_thread::sleep_for(std::chrono::milliseconds(20000)); */
    while (!_exit)
    {
        do
        {
            std::unique_lock<std::mutex> lock(_mutex);
            if (_idle.empty())
            {
                break;
            }

            if (_idle.front()->is_join())
            {
                break;
            }

            uint64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
            if (now - _idle.front()->pts() < 1000)
            {
                break;
            }

            _idle.front()->set_detach(true);
            _cv.notify_all();
        }
        while (0);

        {
            if (0 == (cnt % 2000))
            {
                std::lock_guard<std::mutex> locker(_mutex);
                printf("total size: %ld, idle size: %ld, ready size: %ld, busy size: %ld\n",
                       _size, _idle.size(), _ready.size(),
                       _size - (_idle.size() + _ready.size())
                      );
            }
        }

        std::this_thread::sleep_for(std::chrono::milliseconds(len));
        cnt += len;
    }
}

void *
ThreadPool::entry(void *data, int id)
{
    ThreadPool *pool = (ThreadPool *)data;
    pool->monitor();
    return NULL;
}
